{%- set kafka_password = salt['pillar.get']('kafka:config:password') %}
{%- set kafka_trustpass = salt['pillar.get']('kafka:config:trustpass') %}
{%- set kafka_brokers = salt['pillar.get']('kafka:nodes', {}) %}
{%- set brokers = [] %}

{%- if kafka_brokers %}
{%-   for key, values in kafka_brokers.items() %}
{%-     if 'broker' in values['role'] %}
{%-       do brokers.append(key ~ ':9092') %}
{%-     endif %}
{%-   endfor %}
{%-   set bootstrap_servers = ','.join(brokers) %}

input {
    kafka {
        codec => json
        topics_pattern => '.*-securityonion$'
        group_id => 'searchnodes'
        consumer_threads => 3
        client_id => '{{ GLOBALS.hostname }}'
        security_protocol => 'SSL'
        bootstrap_servers => '{{ bootstrap_servers }}'
        ssl_keystore_location => '/usr/share/logstash/kafka-logstash.p12'
        ssl_keystore_password => '{{ kafka_password }}'
        ssl_keystore_type => 'PKCS12'
        ssl_truststore_location => '/etc/pki/kafka-truststore.jks'
        ssl_truststore_password => '{{ kafka_trustpass }}'
        decorate_events => true
        tags => [ "elastic-agent", "input-{{ GLOBALS.hostname}}", "kafka" ]
    }
}
filter {
  if ![metadata] {
    mutate {
      rename => { "@metadata" => "metadata" }
    }
  }
}
{% endif %}